{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Asyncio tutorial\n",
    "\n",
    "## A normal way in python\n",
    "**Firstly, let's see a running time in a normal way.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Start job  1\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Job  1  takes  1  s\nStart job  2\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Job  2  takes  2  s\nNO async total time :  3.0082271099090576\n"
     ]
    }
   ],
   "source": [
    "import time\n",
    "\n",
    "\n",
    "def job(t):\n",
    "    print('Start job ', t)\n",
    "    time.sleep(t)               # wait for \"t\" seconds\n",
    "    print('Job ', t, ' takes ', t, ' s')\n",
    "    \n",
    "\n",
    "def main():\n",
    "    [job(t) for t in range(1, 3)]\n",
    "    \n",
    "    \n",
    "t1 = time.time()\n",
    "main()\n",
    "print(\"NO async total time : \", time.time() - t1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Translate above to async\n",
    "**Now, let's see the running time using asyncio**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Start job  1\nStart job  2\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Job  1  takes  1  s\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Job  2  takes  2  s\nAsync total time :  2.0054221153259277\n"
     ]
    }
   ],
   "source": [
    "import asyncio\n",
    "\n",
    "\n",
    "async def job(t):\n",
    "    print('Start job ', t)\n",
    "    await asyncio.sleep(t)          # wait for \"t\" seconds, it will look for another job while await\n",
    "    print('Job ', t, ' takes ', t, ' s')\n",
    "    \n",
    "\n",
    "async def main(loop):\n",
    "    tasks = [loop.create_task(job(t)) for t in range(1, 3)]     # just create, not run job\n",
    "    await asyncio.wait(tasks)                                   # run jobs and wait for all tasks done\n",
    "\n",
    "t1 = time.time()\n",
    "loop = asyncio.get_event_loop()\n",
    "loop.run_until_complete(main(loop))\n",
    "# loop.close()                          # Ipython notebook gives error if close loop\n",
    "print(\"Async total time : \", time.time() - t1)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## A normal way in crawling webpage\n",
    "**We can use this machanism in requesting for a website. Await for download a page and switch to do another job.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "https://morvanzhou.github.io/\nhttps://morvanzhou.github.io/\nNormal total time: 0.3869960308074951\n"
     ]
    }
   ],
   "source": [
    "import requests\n",
    "\n",
    "URL = 'https://morvanzhou.github.io/'\n",
    "\n",
    "\n",
    "def normal():  \n",
    "    for i in range(2):\n",
    "        r = requests.get(URL)\n",
    "        url = r.url\n",
    "        print(url)\n",
    "    \n",
    "t1 = time.time()\n",
    "normal()\n",
    "print(\"Normal total time:\", time.time()-t1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Translate above to async using aiohttp\n",
    "**We have to install another useful package called [aiohttp](https://aiohttp.readthedocs.io/en/stable/index.html). You can simply run \"pip3 install aiohttp\" in your terminal.**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['https://morvanzhou.github.io/', 'https://morvanzhou.github.io/']\nAsync total time: 0.11447715759277344\n"
     ]
    }
   ],
   "source": [
    "import aiohttp\n",
    "\n",
    "\n",
    "async def job(session):\n",
    "    response = await session.get(URL)\n",
    "    return str(response.url)\n",
    "\n",
    "\n",
    "async def main(loop):\n",
    "    async with aiohttp.ClientSession() as session:\n",
    "        tasks = [loop.create_task(job(session)) for _ in range(2)]\n",
    "        finished, unfinished = await asyncio.wait(tasks)\n",
    "        all_results = [r.result() for r in finished]        # get return from job\n",
    "        print(all_results)\n",
    "    \n",
    "t1 = time.time()\n",
    "loop = asyncio.get_event_loop()\n",
    "loop.run_until_complete(main(loop))\n",
    "# loop.close()                      # Ipython notebook gives error if close loop\n",
    "print(\"Async total time:\", time.time() - t1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Compare async with multiprocessing\n",
    "**The following code scrape my website with async**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nAsync Crawling...\n\nDistributed Parsing...\n\nAnalysing...\n\nAsync Crawling...\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nDistributed Parsing...\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nAnalysing...\n\nAsync Crawling...\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nDistributed Parsing...\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nAnalysing...\nAsync total time:  7.21798300743103\n"
     ]
    }
   ],
   "source": [
    "import aiohttp\n",
    "import asyncio\n",
    "import time\n",
    "from bs4 import BeautifulSoup\n",
    "from urllib.request import urljoin\n",
    "import re\n",
    "import multiprocessing as mp\n",
    "\n",
    "# base_url = \"https://morvanzhou.github.io/\"\n",
    "base_url = \"http://127.0.0.1:4000/\"\n",
    "\n",
    "# DON'T OVER CRAWL THE WEBSITE OR YOU MAY NEVER VISIT AGAIN\n",
    "if base_url != \"http://127.0.0.1:4000/\":\n",
    "    restricted_crawl = True\n",
    "else:\n",
    "    restricted_crawl = False\n",
    "        \n",
    "        \n",
    "seen = set()\n",
    "unseen = set([base_url])\n",
    "\n",
    "\n",
    "def parse(html):\n",
    "    soup = BeautifulSoup(html, 'lxml')\n",
    "    urls = soup.find_all('a', {\"href\": re.compile('^/.+?/$')})\n",
    "    title = soup.find('h1').get_text().strip()\n",
    "    page_urls = set([urljoin(base_url, url['href']) for url in urls])\n",
    "    url = soup.find('meta', {'property': \"og:url\"})['content']\n",
    "    return title, page_urls, url\n",
    "\n",
    "\n",
    "async def crawl(url, session):\n",
    "    r = await session.get(url)\n",
    "    html = await r.text()\n",
    "    await asyncio.sleep(0.1)        # slightly delay for downloading\n",
    "    return html\n",
    "\n",
    "\n",
    "async def main(loop):\n",
    "    pool = mp.Pool(8)               # slightly affected\n",
    "    async with aiohttp.ClientSession() as session:\n",
    "        count = 1\n",
    "        while len(unseen) != 0:\n",
    "            print('\\nAsync Crawling...')\n",
    "            tasks = [loop.create_task(crawl(url, session)) for url in unseen]\n",
    "            finished, unfinished = await asyncio.wait(tasks)\n",
    "            htmls = [f.result() for f in finished]\n",
    "            \n",
    "            print('\\nDistributed Parsing...')\n",
    "            parse_jobs = [pool.apply_async(parse, args=(html,)) for html in htmls]\n",
    "            results = [j.get() for j in parse_jobs]\n",
    "            \n",
    "            print('\\nAnalysing...')\n",
    "            seen.update(unseen)\n",
    "            unseen.clear()\n",
    "            for title, page_urls, url in results:\n",
    "                # print(count, title, url)\n",
    "                unseen.update(page_urls - seen)\n",
    "                count += 1\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    t1 = time.time()\n",
    "    loop = asyncio.get_event_loop()\n",
    "    loop.run_until_complete(main(loop))\n",
    "    # loop.close()\n",
    "    print(\"Async total time: \", time.time() - t1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Here we try multiprocessing and test the speed**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nDistributed Crawling...\n\nDistributed Parsing...\n\nAnalysing...\n\nDistributed Crawling...\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nDistributed Parsing...\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nAnalysing...\n\nDistributed Crawling...\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nDistributed Parsing...\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\nAnalysing...\nTotal time: 11.5 s\n"
     ]
    }
   ],
   "source": [
    "from urllib.request import urlopen, urljoin\n",
    "from bs4 import BeautifulSoup\n",
    "import multiprocessing as mp\n",
    "import re\n",
    "import time\n",
    "\n",
    "\n",
    "def crawl(url):\n",
    "    response = urlopen(url)\n",
    "    time.sleep(0.1)             # slightly delay for downloading\n",
    "    return response.read().decode()\n",
    "\n",
    "\n",
    "def parse(html):\n",
    "    soup = BeautifulSoup(html, 'lxml')\n",
    "    urls = soup.find_all('a', {\"href\": re.compile('^/.+?/$')})\n",
    "    title = soup.find('h1').get_text().strip()\n",
    "    page_urls = set([urljoin(base_url, url['href']) for url in urls])\n",
    "    url = soup.find('meta', {'property': \"og:url\"})['content']\n",
    "    return title, page_urls, url\n",
    "\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    # base_url = 'https://morvanzhou.github.io/'\n",
    "    base_url = \"http://127.0.0.1:4000/\"\n",
    "        \n",
    "    # DON'T OVER CRAWL THE WEBSITE OR YOU MAY NEVER VISIT AGAIN\n",
    "    if base_url != \"http://127.0.0.1:4000/\":\n",
    "        restricted_crawl = True\n",
    "    else:\n",
    "        restricted_crawl = False\n",
    "        \n",
    "    unseen = set([base_url,])\n",
    "    seen = set()\n",
    "\n",
    "    pool = mp.Pool(8)                       # number strongly affected\n",
    "    count, t1 = 1, time.time()\n",
    "    while len(unseen) != 0:                 # still get some url to visit\n",
    "        if restricted_crawl and len(seen) > 20:\n",
    "            break\n",
    "        print('\\nDistributed Crawling...')\n",
    "        crawl_jobs = [pool.apply_async(crawl, args=(url,)) for url in unseen]\n",
    "        htmls = [j.get() for j in crawl_jobs]                                       # request connection\n",
    "        htmls = [h for h in htmls if h is not None]     # remove None\n",
    "\n",
    "        print('\\nDistributed Parsing...')\n",
    "        parse_jobs = [pool.apply_async(parse, args=(html,)) for html in htmls]\n",
    "        results = [j.get() for j in parse_jobs]                                     # parse html\n",
    "\n",
    "        print('\\nAnalysing...')\n",
    "        seen.update(unseen)\n",
    "        unseen.clear()\n",
    "\n",
    "        for title, page_urls, url in results:\n",
    "            # print(count, title, url)\n",
    "            count += 1\n",
    "            unseen.update(page_urls - seen)\n",
    "\n",
    "    print('Total time: %.1f s' % (time.time()-t1, ))\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
